package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.Subscription;

public class SubscriptionImpl
  implements Subscription
{
  private static final Log log = LogFactory.getLog(SubscriptionImpl.class);
  private String clientId;
  private String subscriberName;
  private ActiveMQDestination destination;
  private String selector;
  private int prefetchLimit;
  private boolean noLocal;
  private boolean active;
  private int consumerNumber;
  private String consumerId;
  private boolean browser;
  protected Dispatcher dispatch;
  private MessageIdentity lastMessageIdentity;
  Filter filter;
  protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
  QueueList messagePtrs = new DefaultQueueList();
  private boolean usePrefetch = false;

  public SubscriptionImpl(Dispatcher dispatcher, ConsumerInfo info, Filter filter)
  {
    this.dispatch = dispatcher;
    this.filter = filter;
    setActiveConsumer(info);
  }

  public void setActiveConsumer(ConsumerInfo info)
  {
    if (info != null) {
      this.clientId = info.getClientId();
      this.subscriberName = info.getConsumerName();
      this.noLocal = info.isNoLocal();
      this.destination = info.getDestination();
      this.selector = info.getSelector();
      this.prefetchLimit = info.getPrefetchNumber();
      this.consumerNumber = info.getConsumerNo();
      this.consumerId = info.getConsumerId();
      this.browser = info.isBrowser();
    }
  }

  public String toString()
  {
    String str = "SubscriptionImpl(" + super.hashCode() + ")[" + this.consumerId + "]" + this.clientId + ": " + this.subscriberName + " : " + this.destination;

    return str;
  }

  public synchronized void clear()
    throws JMSException
  {
    QueueListEntry entry = this.messagePtrs.getFirstEntry();
    while (entry != null) {
      MessagePointer pointer = (MessagePointer)entry.getElement();
      pointer.clear();
      entry = this.messagePtrs.getNextEntry(entry);
    }
    this.messagePtrs.clear();
  }

  public synchronized void reset()
    throws JMSException
  {
    QueueListEntry entry = this.messagePtrs.getFirstEntry();
    while (entry != null) {
      MessagePointer pointer = (MessagePointer)entry.getElement();
      if (!pointer.isDispatched()) break;
      pointer.reset();

      entry = this.messagePtrs.getNextEntry(entry);
    }
  }

  public String getClientId()
  {
    return this.clientId;
  }

  public void setClientId(String clientId)
  {
    this.clientId = clientId;
  }

  public Filter getFilter()
  {
    return this.filter;
  }

  public void setFilter(Filter filter)
  {
    this.filter = filter;
  }

  public boolean isWildcard() {
    return this.filter.isWildcard();
  }

  public String getPersistentKey()
  {
    return null;
  }

  public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException {
    if (isDurableTopic()) {
      return (equal(this.clientId, info.getClientId())) && (equal(this.subscriberName, info.getConsumerName()));
    }
    return false;
  }

  public boolean isNoLocal()
  {
    return this.noLocal;
  }

  public void setNoLocal(boolean noLocal)
  {
    this.noLocal = noLocal;
  }

  public String getSubscriberName()
  {
    return this.subscriberName;
  }

  public void setSubscriberName(String subscriberName)
  {
    this.subscriberName = subscriberName;
  }

  public boolean isTarget(ActiveMQMessage message)
    throws JMSException
  {
    boolean result = false;
    if (message != null) {
      result = this.filter.matches(message);

      if ((this.noLocal) && (result) && 
        (clientIDsEqual(message))) {
        result = false;
      }
    }

    return result;
  }

  public synchronized void addMessage(MessageContainer container, ActiveMQMessage message)
    throws JMSException
  {
    if (log.isDebugEnabled()) {
      log.debug("Adding to subscription: " + this + " message: " + message);
    }
    MessagePointer pointer = new MessagePointer(container, message.getJMSMessageIdentity());

    this.messagePtrs.add(pointer);
    this.dispatch.wakeup(this);
    this.lastMessageIdentity = message.getJMSMessageIdentity();
  }

  public synchronized void messageConsumed(MessageAck ack)
    throws JMSException
  {
    doMessageConsume(ack, true);
  }

  public synchronized void onAcknowledgeTransactedMessageBeforeCommit(MessageAck ack) throws JMSException {
    doMessageConsume(ack, false);
  }

  public synchronized void redeliverMessage(MessageContainer container, MessageAck ack) throws JMSException {
    QueueListEntry entry = this.messagePtrs.getFirstEntry();
    while (entry != null) {
      MessagePointer pointer = (MessagePointer)entry.getElement();
      if (pointer.getMessageIdentity().getMessageID().equals(ack.getMessageID())) {
        break;
      }
      entry = this.messagePtrs.getNextEntry(entry);
    }
    if (entry != null) {
      MessagePointer pointer = (MessagePointer)entry.getElement();
      if (pointer != null) {
        this.unconsumedMessagesDispatched.increment();

        pointer.reset();
        this.dispatch.wakeup(this);
      }
    }
  }

  public synchronized ActiveMQMessage[] getMessagesToDispatch()
    throws JMSException
  {
    if (this.usePrefetch) {
      return getMessagesWithPrefetch();
    }
    List tmpList = new ArrayList();
    QueueListEntry entry = this.messagePtrs.getFirstEntry();
    while (entry != null) {
      MessagePointer pointer = (MessagePointer)entry.getElement();
      if (!pointer.isDispatched()) {
        ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
        if (msg != null) {
          pointer.setDispatched(true);
          tmpList.add(msg);
        }
        else
        {
          log.info("Message probably expired: " + msg);
          QueueListEntry discarded = entry;
          entry = this.messagePtrs.getPrevEntry(discarded);
          this.messagePtrs.remove(discarded);
        }
      }
      entry = this.messagePtrs.getNextEntry(entry);
    }
    ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
    return (ActiveMQMessage[])tmpList.toArray(messages);
  }

  protected synchronized ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
    List tmpList = new ArrayList();
    QueueListEntry entry = this.messagePtrs.getFirstEntry();
    int count = 0;
    int maxNumberToDispatch = this.prefetchLimit - this.unconsumedMessagesDispatched.get();

    while ((entry != null) && (count < maxNumberToDispatch)) {
      MessagePointer pointer = (MessagePointer)entry.getElement();
      if (!pointer.isDispatched()) {
        ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
        if (msg != null) {
          pointer.setDispatched(true);
          tmpList.add(msg);
          this.unconsumedMessagesDispatched.increment();
          count++;
        }
        else
        {
          log.info("Message probably expired: " + msg);
          QueueListEntry discarded = entry;
          entry = this.messagePtrs.getPrevEntry(discarded);
          this.messagePtrs.remove(discarded);
        }
      }
      entry = this.messagePtrs.getNextEntry(entry);
    }

    ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
    return (ActiveMQMessage[])tmpList.toArray(messages);
  }

  public synchronized boolean isAtPrefetchLimit()
    throws JMSException
  {
    if (this.usePrefetch) {
      int underlivedMessageCount = this.messagePtrs.size() - this.unconsumedMessagesDispatched.get();
      return underlivedMessageCount >= this.prefetchLimit;
    }

    return false;
  }

  public synchronized boolean isReadyToDispatch()
    throws JMSException
  {
    boolean answer = (this.active) && (this.messagePtrs.size() > 0);
    return answer;
  }

  public ActiveMQDestination getDestination()
  {
    return this.destination;
  }

  public String getSelector()
  {
    return this.selector;
  }

  public synchronized boolean isActive()
  {
    return this.active;
  }

  public synchronized void setActive(boolean active)
    throws JMSException
  {
    this.active = active;
    if (!active)
      reset();
  }

  public int getConsumerNumber()
  {
    return this.consumerNumber;
  }

  public String getConsumerId()
  {
    return this.consumerId;
  }

  public boolean isDurableTopic()
    throws JMSException
  {
    return (this.destination.isTopic()) && (this.subscriberName != null) && (this.subscriberName.length() > 0);
  }

  public boolean isBrowser()
    throws JMSException
  {
    return this.browser;
  }

  public MessageIdentity getLastMessageIdentity() throws JMSException {
    return this.lastMessageIdentity;
  }

  public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
    this.lastMessageIdentity = messageIdentity;
  }

  protected synchronized void doMessageConsume(MessageAck ack, boolean remove)
    throws JMSException
  {
    int count = 0;
    boolean found = false;
    QueueListEntry entry = this.messagePtrs.getFirstEntry();
    while (entry != null) {
      MessagePointer pointer = (MessagePointer)entry.getElement();
      if (remove) {
        this.messagePtrs.remove(entry);
        if ((ack.isMessageRead()) && (!this.browser)) {
          pointer.delete(ack);
        }
      }
      count++;

      if ((remove) && (!ack.isPartOfTransaction())) {
        this.unconsumedMessagesDispatched.decrement();
      }
      if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
        if ((!remove) && (ack.isPartOfTransaction()))
        {
          this.unconsumedMessagesDispatched.decrement();
        }
        found = true;
        break;
      }
      entry = this.messagePtrs.getNextEntry(entry);
    }
    if (!found) {
      log.warn("Did not find a matching message for identity: " + ack.getMessageIdentity());
    }
    this.dispatch.wakeup(this);
  }

  protected boolean clientIDsEqual(ActiveMQMessage message) {
    String msgClientID = message.getJMSClientID();
    String producerClientID = message.getProducerID();
    String subClientID = this.clientId;
    if ((producerClientID != null) && (producerClientID.equals(subClientID))) {
      return true;
    }
    if (msgClientID == subClientID) {
      return true;
    }
    if ((msgClientID == null) || (subClientID == null)) {
      return false;
    }

    return msgClientID.equals(subClientID);
  }

  protected static final boolean equal(Object left, Object right)
  {
    return (left == right) || ((left != null) && (right != null) && (left.equals(right)));
  }
}